# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import hashlib, copy
import numpy as np
from hysop.tools.htypes import check_instance, to_tuple
from hysop.topology.topology_descriptor import TopologyDescriptor
from hysop.topology.cartesian_topology import CartesianTopology
from hysop.tools.parameters import CartesianDiscretization
from hysop.constants import Backend, BoundaryCondition
from hysop.fields.continuous_field import Field
from hysop.tools.numpywrappers import npw
[docs]
class CartesianTopologyDescriptor(TopologyDescriptor):
"""
Describes how a CartesianTopology topology should be built.
"""
__slots__ = (
"_mpi_params",
"_domain",
"_backend",
"_extra_kwds",
"_cartesian_discretization",
"_space_step",
)
def __init__(self, mpi_params, domain, backend, cartesian_discretization, **kwds):
"""
Initialize a CartesianTopologyDescriptor.
Notes
-----
kwds allows for backend specific variables.
CartesianTopologyDescriptor is immutable.
"""
super().__init__(mpi_params=mpi_params, domain=domain, backend=backend, **kwds)
check_instance(cartesian_discretization, CartesianDiscretization)
# check cartesian_discretization
if (cartesian_discretization.ghosts > 0).any():
msg = "No ghost allowed for a topology descriptor."
raise ValueError(msg)
global_resolution = cartesian_discretization.global_resolution
grid_resolution = cartesian_discretization.grid_resolution
lboundaries = cartesian_discretization.lboundaries
rboundaries = cartesian_discretization.rboundaries
check_instance(grid_resolution, np.ndarray, size=domain.dim, minval=2)
check_instance(global_resolution, np.ndarray, size=domain.dim, minval=2)
check_instance(
lboundaries,
npw.ndarray,
dtype=object,
size=domain.dim,
values=BoundaryCondition,
allow_none=True,
)
check_instance(
rboundaries,
npw.ndarray,
dtype=object,
size=domain.dim,
values=BoundaryCondition,
allow_none=True,
)
is_lperiodic = lboundaries == BoundaryCondition.PERIODIC
is_rperiodic = rboundaries == BoundaryCondition.PERIODIC
assert all((grid_resolution + is_lperiodic) == global_resolution)
msg = "Invalid boundary conditions {} vs {}."
msg = msg.format(lboundaries, rboundaries)
assert not (is_lperiodic ^ is_rperiodic).any(), msg
# compute space step
space_step = npw.asrealarray(domain.length / (global_resolution - 1))
npw.set_readonly(space_step)
self._cartesian_discretization = cartesian_discretization
self._space_step = space_step
@property
def global_resolution(self):
"""Get the global global_resolution of the discretization (logical grid_size)."""
return self._cartesian_discretization.global_resolution
@property
def grid_resolution(self):
"""Get the global grid resolution of the discretization (effective grid size)."""
return self._cartesian_discretization.grid_resolution
@property
def lboundaries(self):
"""Get the left boundaries."""
return self._cartesian_discretization.lboundaries
@property
def rboundaries(self):
"""Get the left boundaries."""
return self._cartesian_discretization.rboundaries
@property
def boundaries(self):
"""Get left and right boundaries."""
return (
self._cartesian_discretization.lboundaries,
self._cartesian_discretization.rboundaries,
)
@property
def space_step(self):
"""Get the space step."""
return self._space_step
[docs]
def match(self, other, invert=False):
"""Test if this descriptor is equivalent to the other one."""
eq = super().match(other, invert=False)
if (eq is NotImplemented) or (
not isinstance(other, CartesianTopologyDescriptor)
):
return NotImplemented
eq &= self._cartesian_discretization == other._cartesian_discretization
if invert:
return not eq
else:
return eq
def __eq__(self, other):
return self.match(other)
def __ne__(self, other):
return self.match(other, invert=True)
def __lt__(self, other):
return (self != other) and (str(self) < str(other))
def __hash__(self):
# hash(super(...)) does not work as expected so be call __hash__ directly
h = super().__hash__()
h ^= hash(self._cartesian_discretization)
return h
def __str__(self):
return ":CartesianTopologyDescriptor: backend={}, domain={}, grid_resolution={}, bc=[{}]".format(
self.backend,
self.domain.full_tag,
self.grid_resolution,
",".join(
"{}/{}".format(
str(lb).replace("HOMOGENEOUS_", "")[:3],
str(rb).replace("HOMOGENEOUS_", "")[:3],
)
for (lb, rb) in zip(*self.boundaries)
),
)
[docs]
@classmethod
def build_descriptor(cls, backend, operator, field, handle, **kwds):
from hysop.core.graph.computational_operator import ComputationalGraphOperator
check_instance(backend, Backend)
check_instance(operator, ComputationalGraphOperator)
check_instance(field, Field)
check_instance(handle, CartesianTopologyDescriptors)
if isinstance(handle, (tuple, list, np.ndarray, CartesianDiscretization)):
if not hasattr(operator, "mpi_params"):
msg = f"mpi_params has not been set in operator {operator.name}."
raise RuntimeError(msg)
if isinstance(handle, CartesianDiscretization):
if handle.ghosts.sum() > 0:
msg = "A CartesianTopology topology descriptor should not contain any ghosts, "
msg += "they will be determined during the get_field_requirements() in the "
msg += " operator initialization step to minimize the number of topologies created."
msg += "\nIf you want to impose a specific topology, you can directly pass a "
msg += "CartesianTopology instance into operator's input or output variables "
msg += "dictionnary instead."
raise ValueError(msg)
if (handle.lboundaries is not None) or (handle.rboundaries is not None):
msg = "A CartesianTopology topology descriptor should not contain any boundary conditions, "
msg += (
"they will be automatically determined from continuous fields."
)
raise ValueError(msg)
global_resolution = handle.resolution
else:
global_resolution = handle
cartesian_discretization = CartesianDiscretization(
resolution=global_resolution,
lboundaries=field.lboundaries_kind,
rboundaries=field.rboundaries_kind,
ghosts=None,
)
kwds.setdefault("mpi_params", operator.mpi_params)
kwds.setdefault("domain", field.domain)
return CartesianTopologyDescriptor(
backend=backend,
cartesian_discretization=cartesian_discretization,
**kwds,
)
elif isinstance(handle, CartesianTopologyDescriptor):
return handle
else:
# handle is a CartesianTopology instance, ghosts and boundary conditions
# can be imposed freely by user here.
return handle
[docs]
def choose_topology(self, known_topologies, **kwds):
"""
Find optimal topology parameters from known_topologies.
If None is returned, create_topology will be called instead.
"""
if known_topologies:
ordered_topologies = tuple(
sorted(known_topologies, key=lambda topo: sum(topo.ghosts))
)
return ordered_topologies[0]
else:
return None
[docs]
def create_topology(self, cutdirs, ghosts):
"""
Build a topology with the current TopologyDescriptor.
Free parameters are cutdir and ghosts which are imposed
by operators on variables and solved during operator's
method get_field_requirements().
"""
discretization = CartesianDiscretization(
resolution=self.grid_resolution,
lboundaries=self.lboundaries,
rboundaries=self.rboundaries,
ghosts=ghosts,
)
return CartesianTopology(
domain=self.domain,
discretization=discretization,
mpi_params=self.mpi_params,
cutdirs=cutdirs,
backend=self.backend,
**self.extra_kwds,
)
CartesianTopologyDescriptors = (
CartesianTopology,
CartesianTopologyDescriptor,
CartesianDiscretization,
tuple,
list,
np.ndarray,
type(None),
)
"""
Instance of those types can be used to create a CartesianTopologyDescriptor.
Thus they can be passed in the variables of each operator supporting
CartesianTopology topologies.
"""
[docs]
def get_topo_descriptor_discretization(td):
"""
Get grid resolution from any type of CartesianTopologyDescriptor.
"""
check_instance(td, CartesianTopologyDescriptors, allow_none=True)
if td is None:
return None
elif isinstance(td, CartesianTopology):
td = td.grid_resolution
elif isinstance(td, CartesianTopologyDescriptor):
td = td.grid_resolution
elif isinstance(td, CartesianDiscretization):
td = td.grid_resolution
return to_tuple(td)